package com.zeyu.framework.monitors.mysql;

import com.beust.jcommander.internal.Lists;
import com.beust.jcommander.internal.Maps;
import com.zeyu.framework.monitors.Collector;
import com.zeyu.framework.monitors.mysql.entity.MysqlStatus;
import com.zeyu.framework.monitors.mysql.entity.ProcessStatus;
import com.zeyu.framework.monitors.mysql.service.MysqlService;
import com.zeyu.framework.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.bind.RelaxedPropertyResolver;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * 执行Mysql的采集任务，将来可能会被任务调度使用，都是独立的方法
 * Created by zeyuphoenix on 16/8/26.
 */
@Component
public class MysqlCollector implements Collector, EnvironmentAware {

    // ================================================================
    // Constants
    // ================================================================

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(MysqlCollector.class);

    /**
     * database配置信息前缀
     */
    private static final String CONFIG_PREFIX = "spring.datasource.";

    // ================================================================
    // Fields
    // ================================================================

    @Autowired
    private DataSource dataSource;
    @Autowired
    private MysqlService mysqlService;

    // 最后一次采集的mysql状态
    private MysqlStatus mysqlStatus;
    // configuration
    private RelaxedPropertyResolver propertyResolver;
    // database connection info
    private String host;
    private String port;
    private String name;

    // ================================================================
    // Constructors
    // ================================================================

    // ================================================================
    // Methods from/for super Interfaces or SuperClass
    // ================================================================

    @Override
    public void collect() {
        logger.info("Mysql收集任务启动...");

        // 获取当前的状态
        mysqlStatus = getCurrentMysqlDBServerStatus();

        mysqlStatus.setTime(new Date());

        // 保存信息到数据库
        mysqlService.save(this.mysqlStatus);
    }

    @Override
    public void setEnvironment(Environment environment) {
        this.propertyResolver = new RelaxedPropertyResolver(environment,
                CONFIG_PREFIX);
        this.paramFormat();
    }

    // ================================================================
    // Public or Protected Methods
    // ================================================================

    /**
     * 获取最后一次采集的mysql状态
     */
    public MysqlStatus getMysqlStatus() {
        return this.mysqlStatus;
    }

    /**
     * 获取当前Mysql的状态
     */
    public MysqlStatus getCurrentMysqlDBServerStatus() {
        logger.debug("获取当前Mysql的状态开始..");

        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;
        // 保存所有状态
        Map<String, String> statusMap = Maps.newHashMap();
        // 获取全局状态
        try {
            // 需要保证初始化Mysql
            // 防止这是第一个启动模块，数据库连接未初始化
            conn = dataSource.getConnection();
            statement = conn.createStatement();

            resultSet = statement.executeQuery("SHOW GLOBAL STATUS;");
            // 处理结果
            transResult(resultSet, statusMap);

        } catch (SQLException e) {
            logger.error("获取连接状态错误： ", e);
        } finally {
            close(resultSet, statement, conn);
        }

        // 获取全局变量
        try {
            conn = dataSource.getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("SHOW GLOBAL VARIABLES;");
            // 处理结果
            transResult(resultSet, statusMap);

        } catch (SQLException e) {
            logger.error("获取参数状态错误： ", e);
        } finally {
            close(resultSet, statement, conn);
        }

        logger.debug("收集Mysql信息完成");
        return loadServerStatus(statusMap);
    }

    /**
     * 获取当前的数据库的连接状态
     *
     * @return 连接状态列表
     */
    public List<ProcessStatus> getProcessStatusList() {
        logger.debug("获取连接状态开始..");

        List<ProcessStatus> processStatusList = Lists.newArrayList();
        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;
        try {
            // 需要保证初始化Mysql
            // 防止这是第一个启动模块，数据库连接未初始化
            conn = dataSource.getConnection();
            statement = conn.createStatement();

            resultSet = statement.executeQuery("show processlist;");
            // 处理结果
            if (resultSet != null) {
                while (resultSet.next()) {
                    ProcessStatus processStatus = new ProcessStatus();
                    processStatus.setProcessId(resultSet.getLong("id"));
                    // 连接的用户名
                    processStatus.setUser(resultSet.getString("user"));
                    processStatus.setHost(resultSet.getString("host"));
                    // 连接的数据库的名称
                    processStatus.setDbName(resultSet.getString("db"));
                    processStatus.setCommand(resultSet.getString("command"));
                    processStatus.setState(resultSet.getString("state"));

                    // 执行的信息
                    processStatus.setInfo(resultSet.getString("info"));

                    processStatusList.add(processStatus);
                }
            }

        } catch (SQLException e) {
            logger.error("获取连接状态错误： ", e);
        } finally {
            close(resultSet, statement, conn);
        }

        return processStatusList;
    }

    // ================================================================
    // Getter & Setter
    // ================================================================

    // ================================================================
    // Private Methods
    // ================================================================

    /**
     * 处理查询结果
     */
    private void transResult(ResultSet resultSet, Map<String, String> statusMap) throws SQLException {

        if (resultSet != null) {
            while (resultSet.next()) {
                String key = resultSet.getString("variable_name");
                String value = resultSet.getString("value");
                if (StringUtils.isNotEmpty(key)) {
                    // 统一放在map中，便于查找
                    statusMap.put(key.toLowerCase(), value);
                }
            }
        }
    }

    /**
     * 根据全局变量加载mysql服务器状态
     *
     * @return MysqlStatus
     */
    private MysqlStatus loadServerStatus(Map<String, String> statusMap) {

        MysqlStatus serverStatus = new MysqlStatus();

        // 配置文件取得
        serverStatus.setHost(this.host);
        serverStatus.setPort(Integer.valueOf(this.port));
        serverStatus.setDbName(this.name);

        // 状态正常
        serverStatus.setStatus(true);
        // mysql位置
        serverStatus.setBaseDir(statusMap.get("basedir"));
        // 版本和主机信息
        serverStatus.setVersion(statusMap.get("version"));
        serverStatus.setHostName("hostname");

        // 运行时长(s)
        long uptime = getLongValue(statusMap.get("uptime"));
        serverStatus.setUptime(uptime);
        // pid和process
        // Linux可以取得
        serverStatus.setPid(100);
        serverStatus.setProcess("mysqld");

        // 连接信息
        serverStatus.setMaxConnections(getIntValue(statusMap
                .get("max_connections")));
        serverStatus.setThreadConnections(getIntValue(statusMap
                .get("threads_connected")));

        // QPS
        long questions = getLongValue(statusMap.get("questions"));
        serverStatus.setQps(divide(questions, uptime));
        // TPS
        // TPS = (Com_commit + Com_rollback) / seconds
        long commit = getLongValue(statusMap.get("com_commit"));
        long rollback = getLongValue(statusMap.get("com_rollback"));
        serverStatus.setTps(divide(commit + rollback, uptime));

        // Bytes_sent
        // Bytes_received
        long bytesSent = getLongValue(statusMap.get("bytes_sent"));
        long bytesReceived = getLongValue(statusMap.get("bytes_received"));
        // MySQL实例平均每秒钟的输入、输出流量，单位为KB/s
        serverStatus.setSendBytes(divide(bytesSent, uptime));
        serverStatus.setReceiveBytes(divide(bytesReceived, uptime));

        // 缓存命中率
        // key_buffer_write_hits = (1-key_writes / key_write_requests) * 100%
        // key_buffer_read_hits = (1-key_reads / key_read_requests) * 100%
        long keyWrites = getLongValue(statusMap.get("key_writes"));
        long keyReads = getLongValue(statusMap.get("key_reads"));
        long keyWriteRequests = getLongValue(statusMap
                .get("key_write_requests"));
        long keyReadRequests = getLongValue(statusMap.get("key_read_requests"));
        // 计算读命中百分比
        if (keyReadRequests == 0L || keyReads == 0L) {
            serverStatus.setKeyReadHits(0.0f);
        } else {
            serverStatus.setKeyReadHits(100.0f - divide(keyReads * 100L,
                    keyReadRequests));
        }
        // 计算写命中百分比
        if (keyWriteRequests == 0L || keyWrites == 0L) {
            serverStatus.setKeyWriteHits(0.0f);
        } else {
            serverStatus.setKeyWriteHits(100.0f - divide(keyWrites * 100L,
                    keyWriteRequests));
        }
        serverStatus.setKeyHits(serverStatus.getKeyReadHits()
                + serverStatus.getKeyWriteHits());

        serverStatus.setTime(new Date());

        return serverStatus;
    }

    /*
     * 保留2位的除法
	 */
    private float divide(long v1, long v2) {

        if (v2 == 0) {
            return 0.0f;
        }
        BigDecimal b1 = new BigDecimal(Long.toString(v1));
        BigDecimal b2 = new BigDecimal(Long.toString(v2));

        return b1.divide(b2, 2, RoundingMode.HALF_UP).floatValue();
    }

    /*
     * 转换格式
     */
    private long getLongValue(String oldValue) {
        if (StringUtils.isNotEmpty(oldValue)) {
            return Long.valueOf(oldValue.trim());
        }
        return 0L;
    }

    /*
     * 转换格式
     */
    private int getIntValue(String oldValue) {
        if (StringUtils.isNotEmpty(oldValue)) {
            return Integer.valueOf(oldValue.trim());
        }
        return 0;
    }

    /**
     * 处理配置参数
     */
    private void paramFormat() {
        /** mysql服务器登录名 */
        // username = this.propertyResolver.getProperty("username", "root");
        /** mysql服务器密码 */
        // password = this.propertyResolver.getProperty("password", "123456");

        // 获取url地址
        String url = this.propertyResolver.getProperty("url",
                "jdbc:mysql://127.0.0.1:3306/framework?useUnicode=true&characterEncoding=utf-8");
        try {
            MessageFormat formater = new MessageFormat(
                    "jdbc:mysql://{0}:{1}/{2}?{3}");
            Object[] arrs = formater.parse(url);
            /** mysql服务器主机地址 */
            host = arrs[0].toString();
            /** mysql服务器端口 */
            port = arrs[1].toString();
            /** mysql数据库名称 */
            name = arrs[2].toString();
        } catch (ParseException e) {
            host = "127.0.0.1";
            port = "3306";
            name = "framework";
        }
    }

    /**
     * 关闭数据库的连接
     *
     * @param resultSet
     *            结果集
     * @param statement
     *            查询
     * @param conn
     *            连接
     */
    private void close(ResultSet resultSet, Statement statement, Connection conn) {

        try {
            if (resultSet != null)
                resultSet.close();
        } catch (SQLException e) {
            logger.error("", e);
        }
        try {
            if (statement != null)
                statement.close();
        } catch (SQLException e) {
            logger.error("", e);
        }
        try {
            if (conn != null)
                conn.close();
        } catch (SQLException e) {
            logger.error("", e);
        }
    }

    // ================================================================
    // Inner or Anonymous Class
    // ================================================================

    // ================================================================
    // Test Methods
    // ================================================================

}
